En esta competencia se propone crear clasificadores de intensidades de emociones: baja, media y alta, para 4 tipos de emociones diferentes: ira, miedo, alegría y tristeza, a partir de un dataset de tweets respectivamente clasificados según emoción e intensidad. Para esto se deben aplicar los conocimientos aprendidos en el curso para el tratamiento de los datos y en caso de tener, conocimientos sobre los modelos de aprendizajes obtenidos en otros cursos como por ejemplo Deep Learning o Minería de Datos. La competencia acepta 4 submissions en donde en cada submission se deben enviar las predicciones de intensidad que arrojaron los clasificadores para cada emoción. Estas predicciones deben contener el resultado de AUC, Kappa y Accuracy para cada tweet evaluado. El resultado de cada submission será evaluado como el promedio de los resultados obtenido para las predicciones de cada emoción.
Consideramos tres aproximaciones para la representacion de un tweet. (1) Diseño manual de atributos, (2) Word embeddings y (3) Mix entre (1) y (2). En lo que sigue detallaremos las representaciones (1) y (2).
(1) Diseño manual de atributos:
Para esto nos enfocamos en construir seis tipos de atributos para el tweet. Esto son: (1.1) Retro, (1.2) Simbolos de puntuacion, (1.3) Emojis, (1.4) Linguisticos, (1.5) n-gramas, (1.6) Lexicones. En lo que sigue, se detallaran cada uno de estos tipos de atributos junto al codigo utilizado para su extraccion.
(1.1) Atributos retro: Consisten en trece atributos tradicionales para la representaciones del texto, p.ej. cantidad de tokens. Para este tipo de atributos, diseñamos los siguientes:
def get_retro_attrib(tweet):
o = {}
o["retro<&>num_tokens"] = len(tweet.split())
o["retro<&>lenght"] = len(" ".join(tweet.split()))
o["retro<&>num_numbs"] = len(re.findall(r"\d+",tweet))
o["retro<&>num_alpha"] = len(re.findall(r"\w+", tweet))
o["retro<&>num_with_uppercase"] = len(re.findall(r"\S*[A-Z]+\S*", tweet))
o["retro<&>num_tokens_upper"] = sum(int(t.isupper()) for t in tweet.split())
def prop_vowels(w):
N = len(w.replace(" ", ""))
if N>0:
return len(re.findall(r"[aeiou]", tweet)) / N
else:
return 0
def len_max_rep_char(w):
w=w+" "
c0 = w[0]
lens = [0]
clen = 1
for c in w[1:]:
if c == c0:
clen += 1
else:
if c0.isalpha():
lens.append(clen)
c0 = c
clen = 1
return max(lens)
o["retro<&>prop_vowels"] = prop_vowels(tweet.lower())
o["retro<&>len_max_rep_char"] = len_max_rep_char(tweet.lower())
def max_char_fre_per_token(w, c="k"):
tw = w.split()
fmax = 0
for t in tw:
f = sum(int(ch==c) for ch in t)
if f>fmax:
fmax = f
return fmax
o["retro<&>max_char_fre_per_token(o)"] = max_char_fre_per_token(tweet.lower(), c="o")
o["retro<&>max_char_fre_per_token(s)"] = max_char_fre_per_token(tweet.lower(), c="s")
o["retro<&>max_char_fre_per_token(g)"] = max_char_fre_per_token(tweet.lower(), c="g")
o["retro<&>max_char_fre_per_token(l)"] = max_char_fre_per_token(tweet.lower(), c="l")
def max_type_rep_char_per_token(w, t="vowel"):
w=unidecode(w+" ")
c0 = w[0]
lens = [0]
clen = 1
for c in w[1:]:
if (c0.isalpha() and c.isalpha()) and ((c in "aeiou" and c0 in "aeiou") or (c not in "aeiou" and c0 not in "aeiou")):
clen += 1
else:
if t=="vowel":
if c0 in "aeiou":
lens.append(clen)
else:
if c0 not in "aeiou":
lens.append(clen)
c0 = c
clen = 1
return max(lens)
o["retro<&>max_type_rep_char_per_token(vowel)"] = max_type_rep_char_per_token(tweet.lower(), t="vowel")
return o
(1.2) Atributos para simbolos de puntuacion: Consisten en siete atributos tradicionales que se enfocan en ver patrones en los simbolos de puntacion del texto. Para este tipo de atributos, diseñamos los siguientes:
def get_punct_attrib(tweet):
o = {}
o["punct<&>[\.]{3}"] = len(re.findall(r"[\.]{3}", tweet))
o["punct<&>[!]"] = len(re.findall(r"[!]", tweet))
o["punct<&>[#]"] = len(re.findall(r"[#]", tweet))
o["punct<&>[#]{1}\S+"] = len(re.findall(r"[#]{1}\S+", tweet))
o["punct<&>[\*]"] = len(re.findall(r"[\*]", tweet))
o["punct<&>[@]{1}\S+"] = len(re.findall(r"[@]{1}\S+", tweet))
o["punct<&>\S*[?]{1}\S*"] = len(re.findall(r"\S*[?]{1}\S*", tweet))
return o
(1.3) Atributos para emojis: Consisten en atributos que cuentan la cantidad de emojis unicos en el texto. Para esto utilizamos la libreria emojilib. Los atributos son obtenidos como sigue:
def get_emojilib_attrib(tweet):
emo_list = emojilib.emoji_list(tweet)
emo_names = list([d['name'] for d in emo_list if 'name' in d])
o = {}
for emo in emo_names:
if emo not in o.keys():
o["emoji<&>"+emo] = 0
o["emoji<&>"+emo] += 1
return o
(1.4) Atributos linguisticos: Consisten en atributos que utilizan conocimiento linguistico para capturar propiedades de interes en el texto. Estos atributos cuentan tipos de tokens en el tweet, esto son: (lemma) palabras lemmatizadas, (pos) etiquetas part-of-speech (POS), p.ej. pos/VERB. (tag) etiquetas detalladas de POS, p.ej. tag/IN. (dep) etiquetas de dependencia, p.ej. dep/ROOT. (shape) forma del token, p.ej. [I, am, 22]->[X, xx, dd]. (is_alpha) Indicador de tokens alfabeticos, p.ej. [I, am, 22]->[1, 1, 0]. (is_stop) Indicador de tokens stop-word, p.ej. [I, am, 22, years, old]->[1, 1, 0, 0, 0]. En particular, bag-of-word (BOW) es un sub-conjunto de los atributos linguisticos. Los atributos son obtenidos con la libreria Spacy como sigue:
def get_linguistics_attrib(tweet):
o = {}
nlp_tweet = nlp(tweet)
for token in nlp_tweet:
label = "text lemma pos tag dep shape is_alpha is_stop".split()
vals = [token.text, token.lemma_.lower(), token.pos_, token.tag_, token.dep_, token.shape_, token.is_alpha, token.is_stop]
dict_vals = dict(zip(label[1:], vals[1:]))
for k, v in dict_vals.items():
l = f"linguistics<&>{k}<&>{v}"
if l not in o.keys():
o[l] = 0
o[l] += 1
return o
(1.5) Atributos con n-gramas: Consisten en atributos que cuentan n-gramas del tweet. En nuestros experimentos solo utilizamos n=2 y n=3. Los atributos son obtenidos como sigue:
def get_n_grams_attrib(tweet):
o = {}
nlp_tweet = nlp(tweet)
def not_stop(tup: tuple) -> bool:
for element in tup:
if element.is_stop:
return False
return True
bi_tokens = [(w[0].lemma_.lower(), w[1].lemma_.lower()) for w in bigrams(nlp_tweet) if not_stop(w)]
for bigram in bi_tokens:
ling = f"linguistics<&>bigram<&>{bigram}"
if ling not in o.keys():
o[ling] = 0
o[ling] += 1
tri_tokens = [(w[0].lemma_.lower(), w[1].lemma_.lower(), w[2].lemma_.lower()) for w in trigrams(nlp_tweet) if not_stop(w)]
for trigram in tri_tokens:
ling = f"linguistics<&>trigram<&>{trigram}"
if ling not in o.keys():
o[ling] = 0
o[ling] += 1
return o
(1.6) Atributos con lexicones: Consisten en atributos que cuentan la cantidad de tipos de tokens en diccionarios estandarizados de palabras. Para esto seleccionamos dos diccionarios: (1) Liu-Hu, con palabras positivas y negativas, y (2) Senti-wordnet, con palabras positivas, negativas y objetivas. Los atributos son obtenidos como sigue:
def get_lexicon_attrib(tweet):
o = {}
o["lexicon<&>LiuHu<&>+"] = sum(int(t.lower() in opinion_lexicon.positive()) for t in tweet.split())
o["lexicon<&>LiuHu<&>-"] = sum(int(t.lower() in opinion_lexicon.negative()) for t in tweet.split())
o_sentiwordnet = get_sentiwordnet_sent(tweet)
o["lexicon<&>sentiwordnet<&>+"] = o_sentiwordnet["+"]
o["lexicon<&>sentiwordnet<&>-"] = o_sentiwordnet["-"]
o["lexicon<&>sentiwordnet<&>o"] = o_sentiwordnet["o"]
return o
# Obs: La funcion get_sentiwordnet_sent es una adaptacion de la propuesta en https://nlpforhackers.io/sentiment-analysis-intro/
(2) Word embeddings:
Para esto utilizamos el modelo del lenguaje BERT pre-entrenada con tweets en ingles denominado BERTweet. Por simplicidad, la representacion de un tweet con BERTweet (o sentence embedding) sera el promedio de los word embeddings entre cada token del tweet. Cada tweet sera un vector de dimension 768. En lo que sigue se detallara como cargar el modelo y obtener el sentence embedding del tweet.
Cargar BERTweet:
import torch
from transformers import AutoModel, AutoTokenizer
bertweet = AutoModel.from_pretrained("vinai/bertweet-base")
tokenizer = AutoTokenizer.from_pretrained("vinai/bertweet-base", use_fast=False, normalization=True)
Obtener sentence embedding:
input_ids = torch.tensor([tokenizer.encode(tweet)])
with torch.no_grad():
outputs = bertweet(input_ids)
hidden_states = outputs[0]
token_embeddings = np.array([ll.numpy() for ll in hidden_states[0]])
sentence_embedding = np.mean(token_embeddings, axis=0)
En nuestros experimentos consideraremos siete colecciones de atributos. Estas son:
A1 Atributos simples: Retro + Simbolos de puntuacion + Emojis + Linguisticos
A2 BERTweet
A3 Mix: Atributos simples + BERTweet
A4 Mix + Lexicones
A1* Atributos simples + n-gramas
A3* Mix + n-gramas
A4* Mix + Lexicones+ n-gramas
Para la seleccion de atributos, utilizamos chi2 para rankear cada atributo y un clasficador Support-vector machine (SVC) para estimar la capacidad predictiva de un modelo de clasificacion con los primeros k-atributos mejores rankeados. Se eligen aquellos atributos que obtengan mejor F1-score promedio en el conjunto de testeo luego de realizar 5-fold cross validation. Como chi2 es un selector de atributos binario y el problema de clasificacion de intensidades es uno ternario, consideramos la siguiente heuristica para rankear un atributo: El puntaje de un atributo es el valor maximo segun chi2 cuando consideramos las clases low-medium, low-high y medium-high, i.e. un atributo es mejor que otro si separa linealmente aquellos tweets en algun par de intensidades. En lo que sigue detallaremos la seleccion de atributos por emocion:
def feature_selection_chi2(sen, df_rep, df_train, n0=10):
"""
funcion que selecciona los mejores atributos data una emocion (sen), representacion de los tweets (df_rep)
y conjunto de entrenamiento (df_train). Para esto, primero se rankean los atributos con chi2 para el caso
ternario y luego se seleccionan los primeros k-atributos mejores rankeados con k desde n0 hasta la cantidad
total de atributos con pasos de 10. Finalmente, se encuentra el mejor k segun un SVC y F1-score promedio con
5-fold cv.
output: Entrega el valor de k optimo y los atributos ordenados desde el segun relevancia.
"""
indexs = df_train[df_train["sen"] == sen].index
scores_selector = {col: [] for col in df_rep.columns.tolist()}
# Calcular chi2 para cada par de clases: low-medium, low-high y medium-high
for l1 in intensities:
for l2 in intensities:
if l1 < l2:
indexsLH = df_train.loc[indexs][(df_train.loc[indexs]["int"].isin([l1, l2]))].index
X = df_rep.loc[indexsLH]
y = df_train.loc[X.index]["int"]
# Balance de datos con over-sampling
dic_label_count = y.value_counts().to_dict()
min_label = min(dic_label_count.items(), key=lambda x: x[1])[0]
max_label = max(dic_label_count.items(), key=lambda x: x[1])[0]
index_label_1 = y[y==min_label].index
oversampling_steps = int(dic_label_count[max_label] / dic_label_count[min_label]) - 1
X_res, y_res = X.copy(), y.copy()
for step in range(oversampling_steps):
new_indexs = [f"{ix}+{step + 1}" for ix in index_label_1]
copied_sub_X = pd.DataFrame(X.loc[index_label_1].values, columns=X.columns, index=new_indexs)
copied_sub_y = pd.Series(y.loc[index_label_1].values, index=new_indexs)
X_res = pd.concat([X_res, copied_sub_X], axis=0)
y_res = pd.concat([y_res, copied_sub_y], axis=0)
X_res = pd.DataFrame(StandardScaler().fit_transform(X_res), columns=X_res.columns, index=X_res.index)
selector = SelectKBest(chi2, k=X.shape[1])
X_res_ = X_res - X_res.min()
selector.fit(X_res_, y_res)
for i, col in enumerate(X.columns.tolist()):
scores_selector[col] += [selector.scores_[i]] if str(selector.scores_[i]) != "nan" else [0]
# Atributos rankeados segun el maximo-chi2
ranked_cols = [x[0] for x in sorted(scores_selector.items(), key=lambda y: max(y[1]), reverse=True)]
# Seleccion del k candidato a optimo
f1_weight = []
b = []
p = []
pp = []
ppp = []
for num_cols in range(n0, len(ranked_cols)+1, 10):
X = df_rep.loc[indexs][ranked_cols[:num_cols+1]]
y = df_train.loc[X.index]["int"]
clf = make_pipeline(StandardScaler(), SVC(kernel="rbf", gamma='auto', class_weight="balanced"))
cv_results = cross_validate(clf, X, y, cv=5, scoring="f1_weighted")
test_score = cv_results["test_score"]
f1_weight.append([num_cols, np.mean(test_score), np.std(test_score)])
# greedy-early-stopping
b.append(f1_weight[-1][1])
p.append(np.mean(b[max(0, len(b)-15-1):]))
pp.append(p[-1]-p[-min(2, len(p))])
ppp.append(np.mean(pp[max(0, len(pp)-30-1):]))
pppc = np.mean([int(x<0) for x in ppp[max(0, len(ppp)-5-1):]])
if int(pppc) == 1:
break
best_f1 = sorted(f1_weight, key=lambda x: x[1], reverse=True)[0][0]
# Seleccion fina del k optimo
fine_f1_weight = []
for num_cols in range(best_f1-10, best_f1+10, 1):
X = df_rep.loc[indexs][ranked_cols[:num_cols+1]]
y = df_train.loc[X.index]["int"]
clf = make_pipeline(StandardScaler(), SVC(kernel="rbf", gamma='auto', class_weight="balanced"))
cv_results = cross_validate(clf, X, y, cv=5, scoring="f1_weighted")
test_score = cv_results["test_score"]
fine_f1_weight.append([num_cols, np.mean(test_score), np.std(test_score)])
print(fine_f1_weight[-1])
fine_best_f1 = sorted(fine_f1_weight, key=lambda x: x[1], reverse=True)[0]
return fine_best_f1, ranked_cols
Por simplicidad consideramos solo tres modelos de clasificacion multi-clase disponibles en la libreria sklearn. Esto son:
Para cada modelo y emoción, se calcularon las siguientes métricas:
AUC: Área bajo la curva ROC, que es una representación gráfica de la proporción de verdaderos positivos (VPR) frente a la proporción de falsos positivos (FPR) según se varía el umbral de discriminación, que es valor a a partir del cual se decide si un caso es positivo.
Kappa: Normalización de la precisión según el desequilibrio en las clases de datos. De esta manera se tienen en cuenta posibles sesgos a una clase mayoritaria.
Accuracy: El porcentaje de clasificaciones correctas.
Para obtenerlas, se realizaron al menos cien iteraciones a cada modelo, en las que el 80% del set de datos, escogido de manera aleatoria, se utilizó para fines de entrenamiento, mientras que el 20% restante se utilizó para testeo. Finalmente, cada métrica se obtuvo del promedio simple de las iteraciones.
...
import pandas as pd
import pickle
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
agg_summaries = pickle.load(open("agg_summaries.pickle", "rb"))
for sen in ['anger', 'fear', 'joy', 'sadness']:
fig = make_subplots(rows=1, cols=3, subplot_titles=("AUC", "Kappa", "Accuracy"))
for k, metric in enumerate(['auc', 'kappa', 'accuracy']):
symbols = "circle square circle circle square circle square".split()
agg_summaries_mlp = agg_summaries["MLP"]
means = [v[sen][metric][0] for v in agg_summaries_mlp.values()]
stds = [v[sen][metric][1] for v in agg_summaries_mlp.values()]
fig.add_trace(
go.Scatter(
x=list(agg_summaries_mlp.keys()),
y=means,
marker_symbol=symbols,
error_y=dict(
type='data',
symmetric=True,
array=stds,
thickness=1
),
marker=dict(color="green",size=5), name="MLP"+" "+metric
),
row=1, col=k+1,
)
agg_summaries_rf = agg_summaries["RF"]
means = [v[sen][metric][0] for v in agg_summaries_rf.values()]
stds = [v[sen][metric][1] for v in agg_summaries_rf.values()]
fig.add_trace(
go.Scatter(
x=list(agg_summaries_rf.keys()),
y=means,
marker_symbol=symbols,
error_y=dict(
type='data',
symmetric=True,
array=stds,
thickness=1
),
marker=dict(color="red",size=5), name="RF"+" "+metric
),
row=1, col=k+1,
)
agg_summaries_svc = agg_summaries["SVC"]
means = [v[sen][metric][0] for v in agg_summaries_svc.values()]
stds = [v[sen][metric][1] for v in agg_summaries_svc.values()]
fig.add_trace(
go.Scatter(
x=list(agg_summaries_svc.keys()),
y=means,
marker_symbol=symbols,
error_y=dict(
type='data',
symmetric=True,
array=stds,
thickness=1
),
marker=dict(color="blue",size=5), name="SVC"+" "+metric
),
row=1, col=k+1,
)
fig.update_layout(height=300, width=900, title_text=f"Métricas por colección de atributos para la emoción <b>{sen}", showlegend=True)
fig.show(renderer='notebook')
Submission 1
| Sentimiento | Atributos |
|---|---|
| anger | A3 |
| fear | A3 |
| joy | A3 |
| sadness | A2 |
Submission 2
| Sentimiento | Atributos |
|---|---|
| anger | A4* |
| fear | A4* |
| joy | A1* |
| sadness | A2 |
results = pickle.load(open("results.pickle", "rb"))
for metric in ["AUC", "Kappa", "Accuracy"]:
fig = go.Figure(data=[
go.Bar(name='Baseline', x=['anger', 'fear', 'joy', 'sadness'], y=results_baseline[metric]),
go.Bar(name='Submission 1', x=['anger', 'fear', 'joy', 'sadness'], y=results_s1[metric], text=["A3", "A3", "A3", "A2"], textfont_color="black"),
go.Bar(name='Submission 2', x=['anger', 'fear', 'joy', 'sadness'], y=results_s2[metric], text=["A4*", "A4*", "A1*", "A2"], textfont_color="black")
])
fig.update_layout(barmode='group', height=400, width=900, title_text=f"<b>{metric}</b> del equipo por sentimiento", showlegend=True)
fig.show(renderer='notebook')
fig.write_html("test.html",
full_html=False,
include_plotlyjs='cdn')
...